package drds.data_propagate.parse.multistage_coordinator;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import drds.data_propagate.binlog_event.BinLogEvent;
import drds.data_propagate.binlog_event.BinLogEventDecoder;
import drds.data_propagate.binlog_event.BinLogEventsContext;
import drds.data_propagate.binlog_event.Buffer;
import drds.data_propagate.binlog_event.event.rows_event.DeleteRowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.UpdateRowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.WriteRowsEvent;
import drds.data_propagate.entry.Entry;
import drds.data_propagate.parse.MessageEvent;
import drds.data_propagate.parse.exception.ParseException;
import drds.data_propagate.parse.table_meta_data.TableMetaData;

public class ParseStageHandler implements EventHandler<MessageEvent>, LifecycleAware {

    private MultistageCoordinatorImpl multiStageCoprocessor;
    private BinLogEventDecoder binLogEventDecoder;
    private BinLogEventsContext binLogEventsContext;

    public ParseStageHandler(MultistageCoordinatorImpl multiStageCoprocessor, BinLogEventsContext binLogEventsContext) {
        this.multiStageCoprocessor = multiStageCoprocessor;
        binLogEventDecoder = new BinLogEventDecoder(BinLogEvent.unknown_event, BinLogEvent.enum_end_event);
        this.binLogEventsContext = binLogEventsContext;
        if (multiStageCoprocessor.gtidSet != null) {
            binLogEventsContext.setGtidSet(multiStageCoprocessor.gtidSet);
        }
    }

    public void onEvent(MessageEvent messageEvent, long index, boolean endOfBatch) throws Exception {
        try {
            BinLogEvent binLogEvent = messageEvent.getBinLogEvent();
            if (binLogEvent == null) {
                Buffer buffer = messageEvent.getBuffer();
                binLogEvent = binLogEventDecoder.decode(buffer, binLogEventsContext);
                messageEvent.setBinLogEvent(binLogEvent);
            }

            int eventType = binLogEvent.getHeader().getEventType();
            TableMetaData tableMetaData = null;
            boolean needDmlParse = false;
            switch (eventType) {
                case BinLogEvent.write_rows_event_v1:
                case BinLogEvent.write_rows_event:
                    tableMetaData = multiStageCoprocessor.binlogEventConvertToEntry.parseRowsEventForTableMetaData((WriteRowsEvent) binLogEvent);
                    needDmlParse = true;
                    break;
                case BinLogEvent.update_rows_event_v1:
                case BinLogEvent.partial_update_rows_event:
                case BinLogEvent.update_rows_event:
                    tableMetaData = multiStageCoprocessor.binlogEventConvertToEntry.parseRowsEventForTableMetaData((UpdateRowsEvent) binLogEvent);
                    needDmlParse = true;
                    break;
                case BinLogEvent.delete_rows_event_v1:
                case BinLogEvent.delete_rows_event:
                    tableMetaData = multiStageCoprocessor.binlogEventConvertToEntry.parseRowsEventForTableMetaData((DeleteRowsEvent) binLogEvent);
                    needDmlParse = true;
                    break;
                case BinLogEvent.rows_query_log_event:
                    needDmlParse = true;
                    break;
                default:
                    Entry entry = multiStageCoprocessor.binlogEventConvertToEntry.parse(messageEvent.getBinLogEvent(), false);
                    messageEvent.setEntry(entry);
            }

            // 记录一下DML的表结构
            messageEvent.setNeedDmlParse(needDmlParse);
            messageEvent.setTableMetaData(tableMetaData);
        } catch (Throwable e) {
            multiStageCoprocessor.exception = new ParseException(e);
            throw multiStageCoprocessor.exception;
        }
    }

    @Override
    public void onStart() {

    }

    @Override
    public void onShutdown() {

    }
}
